{
  "metadata" : {
    "name" : "Spark Dataset 101",
    "user_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "auto_save_timestamp" : "2015-01-10T00:02:12.659Z",
    "language_info" : {
      "name" : "scala",
      "file_extension" : "scala",
      "codemirror_mode" : "text/x-scala"
    },
    "trusted" : true,
    "customLocalRepo" : null,
    "customRepos" : null,
    "customDeps" : null,
    "customImports" : null,
    "customArgs" : null,
    "customSparkConf" : null
  },
  "cells" : [ {
    "metadata" : {
      "id" : "3286C8A0347A477D8E22F88DA1D0FFA6"
    },
    "cell_type" : "markdown",
    "source" : "<style>\n  h1, h2, h3, h4, h5, p, ul, li {\n    color: #2C475C;\n  }\n  .output_html {\n    color: skyblue;\n  }\n  hr { height: 2px; color: lightblue; }\n</style>"
  }, {
    "metadata" : {
      "id" : "9EEE3EB58183458BB2479F7A3FAF3D3E"
    },
    "cell_type" : "markdown",
    "source" : "# Spark 101"
  }, {
    "metadata" : {
      "id" : "6CCFE8BABF35474EBFAA98B3B7E363DB"
    },
    "cell_type" : "markdown",
    "source" : "### First create a dataset using the local `syslog` file"
  }, {
    "metadata" : {
      "id" : "24F5B7D4E1044BE58E625B8780013EDD"
    },
    "cell_type" : "markdown",
    "source" : "We will \n\n*  load the file\n*  convert each line keeping its size\n*  remove the duplicates"
  }, {
    "metadata" : {
      "id" : "B379E68F93F24C2683017D93A1F47ED1"
    },
    "cell_type" : "markdown",
    "source" : "For that, we'll use the `sparkContext`, which\n\n* is the driver\n* can define job (read inputs, transform, group, etc)\n* constructs DAG\n* schedules tasks on the cluster"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "501427AA0C4340A48A256656D33F84F6"
    },
    "cell_type" : "code",
    "source" : "val dta:Dataset[Int] = sparkSession.read.text(\"/var/log/syslog\")\n                                         .map(_.getAs[String](0).size)\n                                         .distinct",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "DD01EFA449E249A98CCF450F18CF771F"
    },
    "cell_type" : "code",
    "source" : "dta.rdd",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "EE0CCD993F96400C83032844B477C5FF"
    },
    "cell_type" : "markdown",
    "source" : "**MapPartitionsRDD** is actually an instance of `Dataset[Int]` that will contain the distinct sizes of the lines."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "4F234A665DD54E0C8E907D0980F56F8A"
    },
    "cell_type" : "code",
    "source" : "dta",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "49366D195C1245459983B44D9DF17A88"
    },
    "cell_type" : "markdown",
    "source" : "_Note_: there is NO computations happening! → [see UI](http://localhost:4040/stages/)"
  }, {
    "metadata" : {
      "id" : "014E031436A34DCB8C8CD949AC7A99FC"
    },
    "cell_type" : "markdown",
    "source" : "-----"
  }, {
    "metadata" : {
      "id" : "C9232A6BFE5740DCAB36138941ED22F2"
    },
    "cell_type" : "markdown",
    "source" : "### Now we can use the size for fancy operations like grouping per last digit"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "FAC726F0D5E24C958FEC4FC9C0AD6785"
    },
    "cell_type" : "code",
    "source" : "val rdd1 = dta.groupBy($\"value\" % 10).count",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "640BBF66B5E24AF08C61196B09BEF7B0"
    },
    "cell_type" : "markdown",
    "source" : "### But we can also get rid of even sizes (... non trivially...), then _tupling_ with some other computations"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "0D59266E317E42748C034EFC82DCBBA7"
    },
    "cell_type" : "code",
    "source" : "val rdd2 = dta.map(_ + 1)\n              .filter(_ % 2 == 0)\n              .map(x => (x%10, x*x))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "4DF7B2BE91844E878415F1A5B1303CBB"
    },
    "cell_type" : "markdown",
    "source" : "-----"
  }, {
    "metadata" : {
      "id" : "6A59CE16E29A4E508EB18A0A44699227"
    },
    "cell_type" : "markdown",
    "source" : "### We can combine distributed datasets into single ones, by _joining_ them for instance."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "57EC95B373854F27B94D505073C8CDB7"
    },
    "cell_type" : "code",
    "source" : "val joined = rdd1.join(rdd2)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "A39C410D44D345D588ED8A8F91A3CA4E"
    },
    "cell_type" : "markdown",
    "source" : "_Note (again)_: still nothing done on the cluster up to here → [see ui](http://localhost:4040/stages/)"
  }, {
    "metadata" : {
      "id" : "E31FD0D4451D43EEB69D3593A39D7C5B"
    },
    "cell_type" : "markdown",
    "source" : "-----"
  }, {
    "metadata" : {
      "id" : "5C0C053691F44C018FEEAD701C8BF6E2"
    },
    "cell_type" : "markdown",
    "source" : "#### Now we ask the cluster to do the whole thing: Action"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "68F6303C7842460F82B5450EDA35E9BE"
    },
    "cell_type" : "code",
    "source" : "joined.take(10).toList.mkString(\"\\n\")",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "2EAA6D6613694CB2864E88ED9CB07714"
    },
    "cell_type" : "markdown",
    "source" : "_Note (yeah)_: NOW there were some computations in the cluster → [see stages](http://localhost:4040/stages/) and [see tasks](http://localhost:4040/stages/stage/?id=3&attempt=0)"
  }, {
    "metadata" : {
      "id" : "8B849E0E2E00400EA04BF2E2C4FFBE60"
    },
    "cell_type" : "markdown",
    "source" : "-----"
  }, {
    "metadata" : {
      "id" : "01770279903E47D599A76D0DF6EDC64C"
    },
    "cell_type" : "markdown",
    "source" : "## But what just happened?"
  }, {
    "metadata" : {
      "id" : "800BAB4FCD924896892A536CE06E47F5"
    },
    "cell_type" : "markdown",
    "source" : "### First Spark created a DAG based on the job definition"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "C6175DEAF8E74FB889421FB91DA2DBB4"
    },
    "cell_type" : "code",
    "source" : "joined.rdd.toDebugString",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "972FC0011A894D3282E0C75872AC1431"
    },
    "cell_type" : "code",
    "source" : "joined.explain",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "16286C36100042AA9CF1243E1D66AE45"
    },
    "cell_type" : "markdown",
    "source" : "### Then it scheduled it to the executors in the cluster <small>only one when running in local mode<small>"
  }, {
    "metadata" : {
      "id" : "0CFC46E6DFBD45F4A06B44CA41FCAD99"
    },
    "cell_type" : "markdown",
    "source" : "We can check the <strong>Total tasks</strong> activity in the [UI](http://localhost:4040/executors/)"
  }, {
    "metadata" : {
      "id" : "D4EF374579DC4EE98D63ED7FC4A31686"
    },
    "cell_type" : "markdown",
    "source" : "-------"
  }, {
    "metadata" : {
      "id" : "8D078825D489455F844CC980548B4D00"
    },
    "cell_type" : "markdown",
    "source" : "## Now we will prepare the dataset and then using it several times"
  }, {
    "metadata" : {
      "id" : "CBF34A436CE1415080B6A9559BD441BF"
    },
    "cell_type" : "markdown",
    "source" : "So we'll read a file about stock price per day, so let's create a type holding relevant data."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "FD86AD48142E47BDABD6581832DE1720"
    },
    "cell_type" : "code",
    "source" : "case class Quote(stock:String, date:String, price:Double) extends java.io.Serializable",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "7E472DE06AFB4BC98B33F10FC84176D6"
    },
    "cell_type" : "markdown",
    "source" : "The file will contain lines like:\n``` \nASTE,2011-12-06,33.93\nASTE,2012-03-14,36.84\n```"
  }, {
    "metadata" : {
      "id" : "7CAF9FFB8BCC48F78775AADC22C5845F"
    },
    "cell_type" : "markdown",
    "source" : "Let's download the data first"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "5442C20E9CA3425FB6AF67F1BF654BDF"
    },
    "cell_type" : "code",
    "source" : "import scala.language.postfixOps\nimport sys.process._\n\"mkdir -p /tmp/data\"!!\n\nif (!new java.io.File(\"/tmp/data/closes.csv\").exists)\n  \"wget https://s3-eu-west-1.amazonaws.com/spark-notebook-data/closes.csv -O /tmp/data/closes.csv\"!!",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "86FF7855FF21418CB986184EFC62D2D9"
    },
    "cell_type" : "code",
    "source" : ":sh du -h /tmp/data/closes.csv",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "433B1BE1A3A34E1A8B15AD14E4AA6D1B"
    },
    "cell_type" : "markdown",
    "source" : "## Read as CSV"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "7774F17B6C35447D8E85908C313DC347"
    },
    "cell_type" : "code",
    "source" : "val closesDf/*:DataFrame*/ = sparkSession.read.format(\"com.databricks.spark.csv\").load(\"/tmp/data/closes.csv\")",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "8D47CD8C1DF94F158CAC8EE7C6ACAEFC"
    },
    "cell_type" : "code",
    "source" : "closesDf",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "E45A01F323DD43ED81BD6A65804FD2F3"
    },
    "cell_type" : "markdown",
    "source" : "## Read as Quotes"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "04CE294385F84B2F90CE8CDC352405DD"
    },
    "cell_type" : "code",
    "source" : "val closes/*:Dataset[Quote]*/ = closesDf.toDF(\"symbol\", \"date\", \"price\")\n                                        .withColumn(\"price\", $\"price\".cast(types.DoubleType))\n                                        .map{ case row => \n                                          Quote(\n                                            row.getAs[String](\"symbol\"), \n                                            row.getAs[String](\"date\"), \n                                            row.getAs[Double](\"price\"))\n                                        }",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "8FC05B372BED4A058E78E37476958247"
    },
    "cell_type" : "markdown",
    "source" : "We have date, so we can group stock prices per day"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "972314299E0647BABDE4B73CBE291144"
    },
    "cell_type" : "code",
    "source" : "val byDate = closes.map(x => (x.date, x))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "D4190DFDC8454C24B19B524D21F1F1A9"
    },
    "cell_type" : "markdown",
    "source" : "Now we can compute the minimum stocks per date"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "DE3B6FC244614BB981A4CF922A9799C7"
    },
    "cell_type" : "code",
    "source" : "def minByDate = byDate.rdd.combineByKey[(String, Double)](                                                                                          \n                        (x:Quote) => (x.stock, x.price), \n                        (d:(String, Double), l:Quote) => if (d._2 < l.price) d else (l.stock, l.price),\n                        (d1:(String, Double), d2:(String, Double)) => if (d1._2 < d2._2) d1 else d2\n                      )",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "16CC10FEEC1D47E78B3226B266EFAF27"
    },
    "cell_type" : "code",
    "source" : "<pre>{minByDate.take(2).toList.mkString(\"\\n\")}</pre>",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "F23BA22F606149F2950E4286C23D528C"
    },
    "cell_type" : "markdown",
    "source" : "It took ~2 seconds (in local[8] and 24G of RAM)"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "0751E57B3B9E469F8F6F14314900D538"
    },
    "cell_type" : "code",
    "source" : "<pre>{minByDate.take(2).toList.mkString(\"\\n\")}</pre>",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "D981544D3EF64014AA55E7CDF86FE2EE"
    },
    "cell_type" : "markdown",
    "source" : "Once again.... 2 seconds!!!"
  }, {
    "metadata" : {
      "id" : "2D5EB6251F9144BE89326BD831289175"
    },
    "cell_type" : "markdown",
    "source" : "#### Solution: caching!"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "8FEF64A58FD9451F810B367DA2D13F6F"
    },
    "cell_type" : "code",
    "source" : "val maxByDate2 = byDate.rdd.combineByKey[(String, Double)](\n  (x:Quote) => (x.stock, x.price), \n  (d:(String, Double), l:Quote) => if (d._2 > l.price) d else (l.stock, l.price),\n  (d1:(String, Double), d2:(String, Double)) => if (d1._2 > d2._2) d1 else d2\n)\n\nmaxByDate2.cache()",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "F33FA62A8F344DC2835A1A1A615C9B93"
    },
    "cell_type" : "markdown",
    "source" : "Ask some data"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "4471F4251981457281AAE7158B742105"
    },
    "cell_type" : "code",
    "source" : "<pre>{maxByDate2.take(2).toList.mkString(\"\\n\")}</pre>",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "E938BAFBC4BC469E8BE875131DC693A9"
    },
    "cell_type" : "markdown",
    "source" : "**Go to [UI](http://localhost:4040/storage/)**"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "5A2A4823A9DF45EBA6802F4006749CC9"
    },
    "cell_type" : "code",
    "source" : "<pre>{maxByDate2.take(2).toList.mkString(\"\\n\")}</pre>",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "8712B55D99784F04826D7E6FEE2BCD1A"
    },
    "cell_type" : "markdown",
    "source" : "**BLAZING FAST** => Reuses the cache!"
  } ],
  "nbformat" : 4
}